package com.atguigu.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.PropertyNamingStrategy;
import com.alibaba.fastjson.serializer.SerializeConfig;
import com.atguigu.app.func.DimAsyncFunction;
import com.atguigu.bean.TradeSkuOrderBean;
import com.atguigu.common.Constant;
import com.atguigu.utils.DorisUtil;
import com.atguigu.utils.KafkaUtil;
import com.atguigu.utils.WindowUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.math.BigDecimal;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

//数据流:web/app -> Nginx -> 业务服务器(Mysql) -> Maxwell -> Kafka(ODS) -> FlinkApp -> Kafka(DWD) -> FlinkApp(关联维表) -> Doris
//程  序:Mock -> Mysql -> Maxwell -> Kafka(ZK) -> Dwd04_TradeOrderDetail2 -> Kafka(ZK) -> Dws09_TradeSkuOrderWindow(HBase HDFS ZK Redis) -> Doris
public class Dws09_TradeSkuOrderWindow_02 {

    public static void main(String[] args) throws Exception {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.1 开启CK
        env.enableCheckpointing(10000L);
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointTimeout(20000L);
        checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/flink-ck");
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //checkpointConfig.setCheckpointInterval(10000L);
        checkpointConfig.setMinPauseBetweenCheckpoints(5000L);
        checkpointConfig.setMaxConcurrentCheckpoints(2);
        //默认是int类型的最大值
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
        env.setStateBackend(new HashMapStateBackend());

        System.setProperty("HADOOP_USER_NAME", "atguigu");

        //2.消费Kafka DWD层 下单明细主题数据创建流
        DataStreamSource<String> kafkaDS = env.fromSource(KafkaUtil.getKafkaSource(Constant.TOPIC_DWD_TRADE_ORDER_DETAIL, "dws_trade_sku_order_230315"), WatermarkStrategy.noWatermarks(), "kafka-source");

        //3.过滤Null(""),并转换为JSON对象,提取事件时间
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                if (!"".equals(value)) {
                    out.collect(JSON.parseObject(value));
                }
            }
        }).assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
            @Override
            public long extractTimestamp(JSONObject element, long recordTimestamp) {
                return element.getLong("create_time");
            }
        }));

        //4.按照order_detail_id分组去重(left join),同时将其转换为JavaBean对象  方案二(只需要用到左表数据)
        SingleOutputStreamOperator<TradeSkuOrderBean> tradeSkuOrderDS = jsonObjDS
                .keyBy(json -> json.getString("id"))
                .flatMap(new RichFlatMapFunction<JSONObject, TradeSkuOrderBean>() {

                    private ValueState<String> valueState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.seconds(5))
                                .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
                                .build();
                        ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("value-state", String.class);
                        stateDescriptor.enableTimeToLive(ttlConfig);
                        valueState = getRuntimeContext().getState(stateDescriptor);
                    }

                    @Override
                    public void flatMap(JSONObject value, Collector<TradeSkuOrderBean> out) throws Exception {

                        String state = valueState.value();
                        if (state == null) {
                            valueState.update("1");

                            BigDecimal splitCouponAmount = value.getBigDecimal("split_coupon_amount");
                            if (splitCouponAmount == null) {
                                splitCouponAmount = new BigDecimal("0.0");
                            }

                            out.collect(TradeSkuOrderBean.builder()
                                    .skuId(value.getString("sku_id"))
                                    .skuName(value.getString("sku_name"))
                                    .curDate(value.getString("create_time").split(" ")[0])
                                    .orderAmount(value.getBigDecimal("split_total_amount"))
                                    .couponAmount(splitCouponAmount)
                                    .activityAmount(value.getBigDecimal("split_activity_amount") == null ? new BigDecimal("0.0") : value.getBigDecimal("split_activity_amount"))
                                    .originalAmount(value.getBigDecimal("split_original_amount"))
                                    .build());
                        }
                    }
                });

        //5.按照sku_id分组开窗聚合
        SingleOutputStreamOperator<TradeSkuOrderBean> reduceDS = tradeSkuOrderDS.keyBy(TradeSkuOrderBean::getSkuId)
                .window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
                .reduce(new ReduceFunction<TradeSkuOrderBean>() {
                    @Override
                    public TradeSkuOrderBean reduce(TradeSkuOrderBean value1, TradeSkuOrderBean value2) throws Exception {
                        value1.setOriginalAmount(value1.getOriginalAmount().add(value2.getOriginalAmount()));
                        value1.setOrderAmount(value1.getOrderAmount().add(value2.getOrderAmount()));
                        value1.setActivityAmount(value1.getActivityAmount().add(value2.getActivityAmount()));
                        value1.setCouponAmount(value1.getCouponAmount().add(value2.getCouponAmount()));
                        return value1;
                    }
                }, new WindowFunction<TradeSkuOrderBean, TradeSkuOrderBean, String, TimeWindow>() {
                    @Override
                    public void apply(String key, TimeWindow window, Iterable<TradeSkuOrderBean> input, Collector<TradeSkuOrderBean> out) throws Exception {
                        WindowUtil.setWindowTime(window, input, out);
                    }
                });

        reduceDS.print("reduceDS>>>>>>");

        //6.关联维表,补充维度信息
        //6.1 关联SKU
        SingleOutputStreamOperator<TradeSkuOrderBean> tradeSkuOrderWithSkuDS = AsyncDataStream.unorderedWait(reduceDS, new DimAsyncFunction<TradeSkuOrderBean>("dim_sku_info") {
            @Override
            public String getKey(TradeSkuOrderBean input) {
                return input.getSkuId();
            }

            @Override
            public void join(TradeSkuOrderBean value, JSONObject dimInfo) {
                value.setSpuId(dimInfo.getString("spu_id"));
                value.setTrademarkId(dimInfo.getString("tm_id"));
                value.setCategory3Id(dimInfo.getString("category3_id"));
            }
        }, 100, TimeUnit.SECONDS);

        //6.2 关联SPU
        SingleOutputStreamOperator<TradeSkuOrderBean> tradeSkuOrderWithSpuDS = AsyncDataStream.unorderedWait(tradeSkuOrderWithSkuDS, new DimAsyncFunction<TradeSkuOrderBean>("dim_spu_info") {
            @Override
            public String getKey(TradeSkuOrderBean input) {
                return input.getSpuId();
            }

            @Override
            public void join(TradeSkuOrderBean value, JSONObject dimInfo) {
                value.setSpuName(dimInfo.getString("spu_name"));
            }
        }, 100, TimeUnit.SECONDS);

        //6.3 关联TM维表
        SingleOutputStreamOperator<TradeSkuOrderBean> tradeSkuOrderWithTmDS = AsyncDataStream.unorderedWait(tradeSkuOrderWithSpuDS, new DimAsyncFunction<TradeSkuOrderBean>("dim_base_trademark") {
            @Override
            public String getKey(TradeSkuOrderBean input) {
                return input.getTrademarkId();
            }

            @Override
            public void join(TradeSkuOrderBean value, JSONObject dimInfo) {
                value.setTrademarkName(dimInfo.getString("tm_name"));
            }
        }, 100, TimeUnit.SECONDS);

        //6.4 关联Category3
        SingleOutputStreamOperator<TradeSkuOrderBean> tradeSkuOrderWithC3DS = AsyncDataStream.unorderedWait(tradeSkuOrderWithTmDS, new DimAsyncFunction<TradeSkuOrderBean>("dim_base_category3") {
            @Override
            public String getKey(TradeSkuOrderBean input) {
                return input.getCategory3Id();
            }

            @Override
            public void join(TradeSkuOrderBean value, JSONObject dimInfo) {
                value.setCategory3Name(dimInfo.getString("name"));
                value.setCategory2Id(dimInfo.getString("category2_id"));
            }
        }, 100, TimeUnit.SECONDS);

        //6.5 关联Category2
        SingleOutputStreamOperator<TradeSkuOrderBean> tradeSkuOrderC2DS = AsyncDataStream.unorderedWait(tradeSkuOrderWithC3DS, new DimAsyncFunction<TradeSkuOrderBean>("dim_base_category2") {
            @Override
            public String getKey(TradeSkuOrderBean input) {
                return input.getCategory2Id();
            }

            @Override
            public void join(TradeSkuOrderBean value, JSONObject dimInfo) {
                value.setCategory2Name(dimInfo.getString("name"));
                value.setCategory1Id(dimInfo.getString("category1_id"));
            }
        }, 100, TimeUnit.SECONDS);

        //6.6 关联Category1
        SingleOutputStreamOperator<TradeSkuOrderBean> tradeSkuOrderWithC1DS = AsyncDataStream.unorderedWait(tradeSkuOrderC2DS, new DimAsyncFunction<TradeSkuOrderBean>("dim_base_category1") {
            @Override
            public String getKey(TradeSkuOrderBean input) {
                return input.getCategory1Id();
            }

            @Override
            public void join(TradeSkuOrderBean value, JSONObject dimInfo) {
                value.setCategory1Name(dimInfo.getString("name"));
            }
        }, 100, TimeUnit.SECONDS);

        tradeSkuOrderWithC1DS.print("category1DS>>>>>>>>");

        //7.将数据写出到Doris
        tradeSkuOrderWithC1DS.print("resultDS>>>>>>>>");
        tradeSkuOrderWithC1DS.map(bean -> {
                    SerializeConfig config = new SerializeConfig();
                    config.propertyNamingStrategy = PropertyNamingStrategy.SnakeCase;  // 转成json的时候, 属性名使用下划线
                    return JSON.toJSONString(bean, config);
                })
                .sinkTo(DorisUtil.getDorisSink("dws_trade_sku_order_window"));

        //8.启动
        env.execute("");

    }

}
